package org.xqh.study.mq.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName RabbitMQConsumer
 * @Description TODO
 * @Author xuqianghui
 * @Date 2022/10/13 15:27
 * @Version 1.0
 */
@Slf4j
public class RabbitMQConsumer {



    public static Map<String, AtomicInteger> countMap = new HashMap<String, AtomicInteger>(){{
        put("m1", new AtomicInteger(0));
        put("m2", new AtomicInteger(0));
    }};

    public static void main(String[] args) throws Exception {
        AtomicInteger consumeCount = new AtomicInteger(0);
        new Thread(()-> {
            try {
                consumeQueue("conn-1", 10, RabbitMQConfig.consumeTagA, "m1", consumeCount);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).start();

        new Thread(()-> {
            try {
                consumeQueue("conn-2", 20, RabbitMQConfig.consumeTagA, "m2", consumeCount);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).start();

        Thread.sleep(10000);
        System.out.println("consumer count: " + JSON.toJSONString(countMap));
    }

    public static void consumeQueue(String connName, int connId, String consumeTag, String title, AtomicInteger consumeCount) throws Exception{
        /**
         * 自定义连接的名称
         */
        Connection conn = RabbitMQConfig.initConnectionFactory().newConnection(connName);

        Channel channel = conn.createChannel(connId);

//        Channel channel = RabbitMQConfig.createConsumeChannel();
        channel.basicConsume(RabbitMQConfig.queueName, false, consumeTag, new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                countMap.get(title).incrementAndGet();
                String exchange = envelope.getExchange();
                String routingKey = envelope.getRoutingKey();
                System.out.println(String.format("%s get msg: %s", title, new String(body)));
                long deliveryTag = envelope.getDeliveryTag();
                consumeCount.incrementAndGet();
                channel.basicAck(deliveryTag, false);
            }
        });
        Thread.sleep(10000);
        channel.close();
        conn.close();
    }
}
